package com.ml4ai.m2s.web

import java.lang
import java.util.concurrent.ArrayBlockingQueue
import java.util.function

import com.ml4ai.core.stack.mq.RabbitMQAgent
import com.ml4ai.m2s.service.RabbitMQService
import javax.servlet.http.HttpServletResponse
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Controller
import org.springframework.web.bind.annotation.{PathVariable, RequestMapping}
import com.ml4ai.core.json.stringJson._
import com.rabbitmq.client.Channel

@Controller
@RequestMapping(Array("/api/stream"))
class StreamHandlerControl {

  @Autowired
  var rabbitMQService: RabbitMQService = null

  @RequestMapping(value = Array("/{streamId}"))
  def connect(@PathVariable(name = "streamId") streamId: String, response: HttpServletResponse): Unit = {
    val os = response.getOutputStream
    val signal = new ArrayBlockingQueue[Boolean](1)
    val rabbitMQAgent: RabbitMQAgent = rabbitMQService.build
    val info = new java.util.LinkedList[(String, Channel)]
    val (queue, channel) = rabbitMQAgent.attachInterimConsume(1, new function.Function[Array[Byte], java.lang.Boolean] {
      override def apply(t: Array[Byte]): lang.Boolean = {
        try {
          if (t.length > 0) {
            os write t
            os.flush
          } else {
            os.close
            val removeCmd = "{}".addString("command", "remove.queue").addString("queue", info.get(0)._1).addString("callback", "")
            rabbitMQAgent.produceText("", "command", removeCmd, true)
            signal.offer(true)
          }
          true
        } catch {
          case e: Exception => {
            val removeCmd = "{}".addString("command", "remove.queue").addString("queue", info.get(0)._1).addString("callback", "")
            rabbitMQAgent.produceText("", "command", removeCmd, true)
            signal.offer(false)
            throw new IllegalStateException(e)
          }
        }
      }
    })
    info.add((queue, channel))
    val requestStreamCmd = "{}".addString("command", "create.stream").addString("id", streamId).addString("callback", queue);
    rabbitMQAgent.produceText("", "command", requestStreamCmd, true)
    signal.take
  }


}
